目录
简介
ExecGo 是一个使用纯 Go 标准库构建的极简 AI 执行引擎,专为 AI Agent 提供任务提交、DAG 调度、并发执行和可观测性的 HTTP 服务。本文档专注于任务图验证机制,详细说明 TaskGraph.Validate() 方法的实现原理,包括空任务图检查、任务 ID 唯一性验证、必需字段完整性检查,以及依赖关系验证机制(包括未知依赖引用检测、自依赖检测、循环依赖检测算法)。
项目结构
ExecGo 采用清晰的分层架构设计,主要包含以下核心模块:
graph TB
subgraph "应用入口"
MAIN[cmd/execgo/main.go]
end
subgraph "API 层"
API[internal/api/handler.go]
end
subgraph "业务逻辑层"
MODELS[internal/models/task.go]
SCHEDULER[internal/scheduler/scheduler.go]
EXECUTOR[internal/executor/executor.go]
end
subgraph "基础设施层"
STATE[internal/state/state.go]
OBS[internal/observability/observability.go]
CONFIG[internal/config/config.go]
end
MAIN --> API
API --> MODELS
API --> SCHEDULER
API --> EXECUTOR
SCHEDULER --> STATE
SCHEDULER --> EXECUTOR
API --> OBS
API --> CONFIG
图表来源
- main.go:1-105
- handler.go:1-157
- task.go:1-149
- scheduler.go:1-231
章节来源
- main.go:1-105
- README.md:149-177
核心组件
TaskGraph 数据结构
TaskGraph 是一次提交的任务 DAG,包含任务列表和验证方法:
classDiagram
class Task {
+string ID
+string Type
+json.RawMessage Params
+[]string DependsOn
+int Retry
+int64 Timeout
+TaskStatus Status
+json.RawMessage Result
+string Error
+time.Time CreatedAt
+time.Time UpdatedAt
}
class TaskGraph {
+[]*Task Tasks
+Validate() error
}
class TaskStatus {
<<enumeration>>
pending
running
success
failed
skipped
}
TaskGraph --> Task : "contains"
Task --> TaskStatus : "uses"
图表来源
- task.go:21-39
验证流程
任务图验证采用多阶段检查机制,确保任务图的完整性和有效性:
flowchart TD
Start([开始验证]) --> CheckEmpty["检查任务图是否为空"]
CheckEmpty --> Empty{"是否为空?"}
Empty --> |是| ReturnEmpty["返回空任务图错误"]
Empty --> |否| CheckFields["检查必需字段"]
CheckFields --> FieldValid{"字段是否有效?"}
FieldValid --> |否| ReturnFieldError["返回字段错误"]
FieldValid --> |是| CheckUnique["检查任务 ID 唯一性"]
CheckUnique --> Unique{"ID 是否唯一?"}
Unique --> |否| ReturnDuplicate["返回重复 ID 错误"]
Unique --> |是| CheckDeps["检查依赖关系"]
CheckDeps --> DepsValid{"依赖是否有效?"}
DepsValid --> |否| ReturnDepError["返回依赖错误"]
DepsValid --> |是| CheckCycle["检查循环依赖"]
CheckCycle --> Cycle{"是否存在循环?"}
Cycle --> |是| ReturnCycle["返回循环依赖错误"]
Cycle --> |否| Success["验证成功"]
ReturnEmpty --> End([结束])
ReturnFieldError --> End
ReturnDuplicate --> End
ReturnDepError --> End
ReturnCycle --> End
Success --> End
图表来源
- task.go:41-79
章节来源
- task.go:21-79
架构概览
ExecGo 的验证机制在整个系统架构中的位置如下:
sequenceDiagram
participant Client as "客户端"
participant API as "API 处理器"
participant Validator as "TaskGraph.Validate()"
participant Scheduler as "调度器"
participant Executor as "执行器"
Client->>API : POST /tasks (JSON)
API->>API : 解析请求体
API->>Validator : graph.Validate()
Validator->>Validator : 检查空任务图
Validator->>Validator : 检查必需字段
Validator->>Validator : 检查 ID 唯一性
Validator->>Validator : 检查依赖关系
Validator->>Validator : 检查循环依赖
alt 验证失败
Validator-->>API : 返回错误
API-->>Client : 400 Bad Request
else 验证成功
Validator-->>API : 返回 nil
API->>Scheduler : Submit(graph)
Scheduler->>Scheduler : 构建依赖图
Scheduler-->>Client : 202 Accepted
end
图表来源
- handler.go:58-99
- task.go:41-79
详细组件分析
TaskGraph.Validate() 方法实现
TaskGraph.Validate() 方法实现了完整的任务图验证逻辑,采用分阶段验证策略:
1. 空任务图检查
验证过程首先检查任务图是否为空,这是最基本的验证步骤:
flowchart TD
Start([Validate 方法调用]) --> CheckLen["len(g.Tasks) == 0"]
CheckLen --> IsEmpty{"长度为 0 ?"}
IsEmpty --> |是| ReturnEmpty["返回 'task graph is empty' 错误"]
IsEmpty --> |否| InitMap["初始化 ID 映射"]
ReturnEmpty --> End([结束])
InitMap --> CheckRequired["遍历每个任务检查必需字段"]
图表来源
- task.go:42-59
2. 必需字段完整性检查
对于每个任务,验证以下必需字段:
ID: 任务唯一标识符Type: 任务类型,必须是非空字符串
flowchart TD
CheckRequired --> LoopTasks["遍历 g.Tasks"]
LoopTasks --> CheckID["t.ID == ''"]
CheckID --> HasID{"ID 为空?"}
HasID --> |是| ReturnIDReq["返回 'task id is required' 错误"]
HasID --> |否| CheckType["t.Type == ''"]
CheckType --> HasType{"Type 为空?"}
HasType --> |是| ReturnTypeReq["返回 'task %q: type is required' 错误"]
HasType --> |否| NextTask["检查下一个任务"]
ReturnIDReq --> End([结束])
ReturnTypeReq --> End
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| CheckUnique
图表来源
- task.go:48-58
3. 任务 ID 唯一性验证
使用哈希映射确保每个任务 ID 在任务图中唯一:
flowchart TD
CheckUnique --> InitMap["ids := make(map[string]bool, len(g.Tasks))"]
InitMap --> LoopTasks["for _, t := range g.Tasks"]
LoopTasks --> CheckDup["ids[t.ID]"]
CheckDup --> Duplicate{"ID 已存在?"}
Duplicate --> |是| ReturnDup["返回 'duplicate task id: %q' 错误"]
Duplicate --> |否| AddID["ids[t.ID] = true"]
AddID --> NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| CheckDeps
ReturnDup --> End([结束])
图表来源
- task.go:47-58
4. 依赖关系验证
依赖关系验证包含两个关键检查:未知依赖引用检测和自依赖检测。
4.1 未知依赖引用检测
检查每个任务的 DependsOn 数组,确保所有依赖任务 ID 都存在于任务图中:
flowchart TD
CheckDeps --> LoopTasks["遍历 g.Tasks"]
LoopTasks --> LoopDeps["遍历 t.DependsOn"]
LoopDeps --> CheckUnknown["!ids[dep]"]
CheckUnknown --> Unknown{"依赖 ID 不存在?"}
Unknown --> |是| ReturnUnknown["返回 'task %q depends on unknown task %q' 错误"]
Unknown --> |否| CheckSelf["dep == t.ID"]
CheckSelf --> SelfDep{"依赖等于自身?"}
SelfDep --> |是| ReturnSelf["返回 'task %q cannot depend on itself' 错误"]
SelfDep --> |否| NextDep["下一个依赖"]
NextDep --> MoreDeps{"还有依赖?"}
MoreDeps --> |是| LoopDeps
MoreDeps --> |否| NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| CheckCycle
ReturnUnknown --> End([结束])
ReturnSelf --> End
图表来源
- task.go:62-71
4.2 自依赖检测
防止任务直接或间接依赖自身,这会导致循环依赖问题。
5. 循环依赖检测算法
使用 Kahn 算法进行拓扑排序检测,确保任务图没有循环依赖:
flowchart TD
CheckCycle --> InitArrays["初始化 inDegree 和 dependents 映射"]
InitArrays --> BuildGraph["构建入度图和反向依赖图"]
BuildGraph --> CountDeps["for _, t := range tasks"]
CountDeps --> IncDeg["inDegree[t.ID]++"]
IncDeg --> AddDep["dependents[dep] = append(dependents[dep], t.ID)"]
AddDep --> NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| CountDeps
MoreTasks --> |否| InitQueue["初始化队列,放入入度为 0 的节点"]
InitQueue --> BFS["BFS 拓扑排序"]
BFS --> VisitNode["visited++"]
VisitNode --> DecDeg["for _, child := range dependents[node]"]
DecDeg --> Decrement["inDegree[child]--"]
Decrement --> CheckZero{"inDegree[child] == 0?"}
CheckZero --> |是| Enqueue["queue = append(queue, child)"]
CheckZero --> |否| Continue["继续"]
Enqueue --> Continue
Continue --> MoreQueue{"队列是否为空?"}
MoreQueue --> |否| BFS
MoreQueue --> |是| CheckVisited["if visited != len(tasks)"]
CheckVisited --> HasCycle{"访问数量不等于任务数量?"}
HasCycle --> |是| ReturnCycle["返回 'cycle detected in task graph' 错误"]
HasCycle --> |否| NoCycle["无循环依赖"]
ReturnCycle --> End([结束])
NoCycle --> End
图表来源
- task.go:81-121
章节来源
- task.go:41-121
API 层集成
API 处理器在接收到任务提交请求后,会调用 TaskGraph.Validate() 方法进行验证:
sequenceDiagram
participant Client as "客户端"
participant Handler as "handleSubmitTasks"
participant Graph as "TaskGraph"
participant Validator as "Validate()"
participant Executor as "执行器验证"
participant Scheduler as "调度器"
Client->>Handler : POST /tasks
Handler->>Handler : 解析 JSON 请求体
Handler->>Graph : 创建 TaskGraph 实例
Handler->>Validator : graph.Validate()
alt 验证失败
Validator-->>Handler : error
Handler->>Handler : 记录警告日志
Handler-->>Client : 400 Bad Request + 错误信息
else 验证成功
Validator-->>Handler : nil
Handler->>Executor : 验证任务类型
alt 执行器不存在
Executor-->>Handler : error
Handler-->>Client : 400 Bad Request + 可用类型列表
else 执行器存在
Executor-->>Handler : 成功
Handler->>Scheduler : Submit(graph)
Handler-->>Client : 202 Accepted + 任务 ID 列表
end
end
图表来源
- handler.go:58-99
章节来源
- handler.go:58-99
调度器集成
调度器在接收验证通过的任务图后,会构建内部的数据结构来支持并发执行:
flowchart TD
Submit([Submit 方法调用]) --> Lock["s.mu.Lock()"]
Lock --> LoopTasks["遍历 graph.Tasks"]
LoopTasks --> SetStatus["task.Status = StatusPending"]
SetStatus --> PutState["s.state.Put(task)"]
PutState --> IncMetrics["s.metrics.TasksTotal.Add(1)"]
IncMetrics --> IncType["s.metrics.IncType(task.Type)"]
IncType --> BuildDep["构建依赖计数和反向依赖图"]
BuildDep --> DepCount["s.depCount[task.ID] = len(task.DependsOn)"]
DepCount --> LoopDeps["for _, dep := range task.DependsOn"]
LoopDeps --> AddDep["s.dependents[dep] = append(s.dependents[dep], task.ID)"]
AddDep --> NextTask["下一个任务"]
NextTask --> MoreTasks{"还有任务?"}
MoreTasks --> |是| LoopTasks
MoreTasks --> |否| EnqueueReady["将入度为 0 的任务加入就绪队列"]
EnqueueReady --> Unlock["s.mu.Unlock()"]
Unlock --> Done([完成])
图表来源
- scheduler.go:69-97
章节来源
- scheduler.go:69-97
依赖关系分析
组件耦合度分析
ExecGo 的验证机制展现了良好的模块化设计:
graph TB
subgraph "验证相关组件"
MODELS[models/task.go<br/>TaskGraph.Validate]
API[api/handler.go<br/>handleSubmitTasks]
SCHEDULER[scheduler/scheduler.go<br/>Submit]
EXECUTOR[executor/executor.go<br/>Get/RegisteredTypes]
end
subgraph "外部依赖"
NET[net/http<br/>HTTP 处理]
LOG[slog<br/>日志记录]
TIME[time<br/>时间处理]
end
API --> MODELS
API --> EXECUTOR
API --> SCHEDULER
MODELS --> TIME
SCHEDULER --> EXECUTOR
SCHEDULER --> LOG
API --> NET
API --> LOG
图表来源
- task.go:1-149
- handler.go:1-157
- scheduler.go:1-231
- executor.go:1-68
错误处理策略
验证机制采用了统一的错误处理策略:
| 验证阶段 | 错误类型 | HTTP 状态码 | 错误消息格式 |
|---|---|---|---|
| 空任务图检查 | EmptyGraphError | 400 | "task graph is empty" |
| 必需字段检查 | RequiredFieldError | 400 | "task id is required" 或 "task %q: type is required" |
| ID 唯一性检查 | DuplicateIDError | 400 | "duplicate task id: %q" |
| 依赖引用检查 | UnknownDepError | 400 | "task %q depends on unknown task %q" |
| 自依赖检查 | SelfDepError | 400 | "task %q cannot depend on itself" |
| 循环依赖检查 | CycleError | 400 | "cycle detected in task graph" |
| 执行器类型检查 | UnknownTypeError | 400 | "unknown task type: %q (available: %s)" |
章节来源
- task.go:42-79
- handler.go:70-85
性能考虑
时间复杂度分析
- 空任务图检查: O(1) - 单次数组长度检查
- 必需字段检查: O(n) - n 为任务数量
- ID 唯一性检查: O(n) - 哈希映射插入和查找
- 依赖关系检查: O(n + d) - n 为任务数量,d 为依赖关系总数
- 循环依赖检测: O(n + d) - Kahn 算法的标准复杂度
总时间复杂度为 O(n + d),空间复杂度为 O(n + d)。
优化建议
- 早期短路: 验证过程采用早期短路策略,一旦发现错误立即返回
- 内存复用: 使用预分配的哈希映射减少内存分配开销
- 单次遍历: 循环依赖检测与依赖关系检查结合,避免重复遍历
故障排除指南
常见验证失败场景
1. 空任务图错误
症状: 提交空的任务数组 解决方案: 确保任务数组至少包含一个任务
2. 缺少必需字段
症状: 任务缺少 id 或 type 字段
解决方案: 为每个任务提供唯一且非空的 id 和有效的 type
3. 重复任务 ID
症状: 多个任务使用相同的 id
解决方案: 确保每个任务的 id 在整个任务图中唯一
4. 未知依赖引用
症状: 任务依赖不存在的任务 ID 解决方案: 确保所有依赖的任务 ID 都存在于任务图中
5. 自依赖问题
症状: 任务直接依赖自身 解决方案: 移除自依赖关系
6. 循环依赖
症状: 任务之间形成循环依赖链 解决方案: 重新设计任务依赖关系,打破循环
调试技巧
- 启用详细日志: 查看 API 层的日志输出了解验证失败的具体原因
- 逐步验证: 先验证基本字段,再检查依赖关系
- 使用健康检查: 通过
/health端点确认服务正常运行
章节来源
- handler.go:69-74
- task.go:42-79
结论
ExecGo 的任务图验证机制通过多层次、渐进式的验证策略,确保了任务图的完整性和有效性。该机制具有以下特点:
- 全面性: 覆盖了从基础字段验证到高级依赖关系分析的所有必要检查
- 高效性: 采用优化的算法和数据结构,确保验证过程快速执行
- 可维护性: 清晰的模块化设计和统一的错误处理策略
- 用户友好: 提供详细的错误信息,帮助开发者快速定位和解决问题
通过 TaskGraph.Validate() 方法,ExecGo 为 AI Agent 提供了一个可靠的任务执行基础,确保复杂的工作流能够在保证正确性的前提下高效执行。